package com.huan.hadoop.mr;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

/**
 * MapReduce程序中Reduce阶段的处理类，对应着 ReduceTask
 * KEYIN: 表示的是reduce阶段的输入kv中k的类型，对应着map输出的key, 在本例子中是 单词 Text
 * VALUEIN: 表示的是reduce阶段的输入kv中v的类型，对应着map输出的value, 在本例子中是 单词次数1 LongWritable
 * KEYOUT: 表示的是reduce阶段的输出kv中k的类型，跟业务相关，本例子中是 单词 Text
 * VALUEOUT: 表示的是reduce阶段的输出kv中v的类型，跟业务相关，本例子中是 单词总次数 LongWritable
 *
 * @author huan.fu
 * @date 2023/7/8 - 11:38
 */
public class WordCountReducer extends Reducer<Text, LongWritable, Text, LongWritable> {

    private LongWritable wordCount = new LongWritable();

    /**
     * reduce阶段业务逻辑处理
     *
     * <pre>
     * 思考： 当map的所有输出数据来到reduce之后，该如何调用reduce方法进行处理呢？
     *       map阶段输出的数据： <hello,1> <hadoop,1> <hadoop,1> <hello,1> <hello,1>
     * 1、排序 规则：根据key的字典序进行排序a-z
     *    <hadoop,1> <hadoop,1> <hello,1> <hello,1> <hello,1>
     * 2、分组 规则：key相同的分为一组
     *    <hadoop,1> <hadoop,1>
     *    <hello,1> <hello,1> <hello,1>
     * 3、分组之后，同一组的数据组成一个新的kv键值对，调用一次reduce方法。 reduce方法基于分组调用的 一个分组调用一次
     *    同一组的数据组成一个新的kv键值对
     *    新key: 该组共同的key
     *    新value: 该组所有的value组成的一个迭代器Iterable
     *    <hadoop,1> <hadoop,1>             ---> <hadoop,Iterable[1,1]>
     *    <hello,1> <hello,1> <hello,1>     ---> <hello,Iterable[1,1,1]>
     * </pre>
     */
    @Override
    protected void reduce(Text key, Iterable<LongWritable> values, Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
        // 计算这个单词出现的次数
        long wordCnt = 0;
        for (LongWritable value : values) {
            wordCnt += value.get();
        }
        wordCount.set(wordCnt);
        // 输出结果
        context.write(key, wordCount);
    }
}
